package cAmqp

import (
	"fmt"

	"github.com/gin-gonic/gin"
	"github.com/rabbitmq/amqp091-go"

	"gitee.com/csingo/cLog"
)

func Consume(ctx *gin.Context, option ConsumeOption) (err error) {
	var bind bool
	var channel *amqp091.Channel
	var driver *AmqpConf_Driver
	var name string

	defer func() {
		if r := recover(); r != nil {
			cLog.WithContext(ctx, map[string]any{
				"source": "cAmqp.Consume",
				"traces": traces(),
				"panic":  fmt.Sprintf("%v", r),
			}).Error("AMQP 消费者执行失败")
			err = fmt.Errorf("%s", r)
		}

		// 根据bind状态执行处理函数
		if !bind && option.BindHandler != nil {
			runBindHandlerFunc(ctx, bind, option.Params, option.BindHandler)
		}
	}()

	name, channel, driver, err = self.channel(ctx, option.DriverName, option.DriverConf, ConnectionType_Consumer)
	if err != nil {
		cLog.WithContext(ctx, map[string]any{
			"source": "cAmqp.Consume",
			"option": option,
			"err":    err.Error(),
		}).Error("cAmqp.Produce:获取channel异常")
		return
	}
	defer func() {
		self.close(ctx, driver.Connection, name)
	}()

	err = channel.ExchangeDeclare(
		driver.ExchangeName,
		string(driver.ExchangeType),
		option.Exchange.Durable,
		// option.Exchange.AutoDeleted,
		false,
		option.Exchange.Internal,
		option.Exchange.NoWait,
		option.Exchange.Arguments,
	)
	if err != nil {
		return
	}

	_, err = channel.QueueDeclare(
		driver.QueueName,
		option.Queue.Durable,
		// option.Queue.AutoDeleted,
		false,
		option.Queue.Exclusive,
		option.Queue.NoWait,
		option.Queue.Arguments,
	)
	if err != nil {
		return
	}

	err = channel.QueueBind(driver.QueueName, driver.RoutingKey, driver.ExchangeName, option.Bind.NoWait, option.Bind.Arguments)
	if err != nil {
		return
	}

	// 绑定 exchange 和 queue 后进行回调
	bind = true
	if option.BindHandler != nil {
		runBindHandlerFunc(ctx, bind, option.Params, option.BindHandler)
	}

	// 不存在消费处理函数，则使用默认处理方法
	if option.Handler == nil {
		option.Handler = defaultHandler
	}

	// 开始消费，则进行回调
	if option.StartHandler != nil {
		err = runParamsHandlerFunc(ctx, option.Params, option.StartHandler)
		if err != nil {
			cLog.WithContext(ctx, map[string]any{
				"source": "cAmqp.Consume",
				"params": option.Params,
				"err":    err.Error(),
			}).Error("AMQP 消费者 StartHandler 执行失败")
			return
		}
	}

	for {
		name, channel, driver, err = self.channel(ctx, option.DriverName, option.DriverConf, ConnectionType_Consumer)
		if err != nil {
			cLog.WithContext(ctx, map[string]any{
				"source": "cAmqp.Consume",
				"option": option,
				"err":    err.Error(),
			}).Error("cAmqp.Produce:获取channel异常")
			continue
		}
		switch option.ConsumeType {
		case ConsumeType_BasicConsume:
			err = basicConsume(ctx, channel, driver, option)
		case ConsumeType_BasicGet:
			err = basicGet(ctx, channel, driver, option)
		}

		if err != nil {
			cLog.WithContext(ctx, map[string]any{
				"source": "cAmqp.Consume",
				"driver": driver,
				"option": option,
				"err":    err.Error(),
			}).Error("AMQP 消费者 消费异常")
			continue
		} else {
			break
		}
	}

	// 自动删除队列
	if option.Queue.AutoDeleted {
		_, err = channel.QueueDelete(driver.QueueName, true, true, false)
		if err != nil {
			cLog.WithContext(ctx, map[string]any{
				"source": "cAmqp.Consume",
				"driver": driver,
				"err":    err.Error(),
			}).Error("AMQP 消费者删除queue异常")
		}
	}
	// 自动删除交换器
	if option.Exchange.AutoDeleted {
		err = channel.ExchangeDelete(driver.ExchangeName, true, false)
		if err != nil {
			cLog.WithContext(ctx, map[string]any{
				"source": "cAmqp.Consume",
				"driver": driver,
				"err":    err.Error(),
			}).Error("AMQP 消费者删除exchange异常")
		}
	}

	if option.FinishHandler != nil {
		err = runParamsHandlerFunc(ctx, option.Params, option.FinishHandler)
		if err != nil {
			cLog.WithContext(ctx, map[string]any{
				"source": "cAmqp.Consume",
				"err":    err.Error(),
				"params": option.Params,
			}).Error("AMQP 消费者 FinishHandler 执行失败")
			return
		}
	}

	return
}
